package com.shujia.sink;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class mysqlSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> students = env.readTextFile("flink/data/students.csv");

        //将结果保存到数据库中
        SinkFunction<String> jdbcSink = JdbcSink.sink(
                "replace into student values (?,?,?,?,?)",
                (ps, line) -> {
                    //ps是PreparedStatement
                    //line是一行数据
                    String[] split = line.split(",");
                    ps.setString(1, split[0]);
                    ps.setString(2, split[1]);
                    ps.setInt(3, Integer.parseInt(split[2]));
                    ps.setString(4, split[3]);
                    ps.setString(5, split[4]);
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://192.168.205.129:3306/bigdata27?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build());

        //使用jdbc sink
        students.addSink(jdbcSink);
        env.execute();

    }
}
